package com.edclol.apitest.tableapi;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


public class TableTest4_KafkaPipeLine2_1 {
    public static void main(String[] args) throws Exception {
        // 1. 创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 2. 连接Kafka，读取数据

        String sql = "" +
                "CREATE TABLE inputTable (\n" +
                "  `k_version` INT,\n" +
                "  `k_user_id` BIGINT,\n" +
                "  `k_item_id` BIGINT,\n" +
                "  `version` INT,\n" +
                "  `behavior` STRING\n" +
                ") WITH (\n" +
                "  'connector' = 'kafka',\n" +
                " 'topic' = 'json1',"+
                "  'properties.bootstrap.servers' = 'localhost:9092',\n" +
                "  'properties.group.id' = 'testGroup',\n" +
                "  'scan.startup.mode' = 'earliest-offset'," +
//                "  'key.fields' = 'data;;k_item_id',\n" +
                "  'value.format' = 'json',\n" +
//                "  'value.fields-include' = 'EXCEPT_KEY'\n" +
                ")" +
                "";
        tableEnv.sqlUpdate(sql);
        Table inputTable = tableEnv.from("inputTable");
        inputTable.printSchema();
        tableEnv.toAppendStream(inputTable, Row.class).print();
      /*  // 3. 查询转换
        // 简单转换
        Table sensorTable = tableEnv.from("inputTable");
        Table resultTable = sensorTable.select("id, temp")
                .filter("id === 'sensor_6'");

        // 聚合统计
        Table aggTable = sensorTable.groupBy("id")
                .select("id, id.count as count, temp.avg as avgTemp");

        // 4. 建立kafka连接，输出到不同的topic下
        tableEnv.connect(new Kafka()
                .version("0.11")
                .topic("sinktest")
                .property("zookeeper.connect", "localhost:2181")
                .property("bootstrap.servers", "localhost:9092")
        )
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("id", DataTypes.STRING())
//                        .field("timestamp", DataTypes.BIGINT())
                        .field("temp", DataTypes.DOUBLE())
                )
                .createTemporaryTable("outputTable");
                resultTable.insertInto("outputTable");
                */


        env.execute();
    }
}
